iT邦幫忙

2021 iThome 鐵人賽

DAY 2
0
自我挑戰組

從C到JS的同步非同步探索系列 第 2

[Day 2] 一個非同步案例 httpServer

  • 分享至 

  • xImage
  •  

前言

或許有些人會有所困惑, 同步非同步的實踐難在哪裡, 為甚麼要一直巴拉巴拉, 但事實上, 非同步就是創建出多條 thread 無序的佔用資源進行運算, 而當資源被多個消費者無序調用時, Race condition 等問題油然而生, 所以幾乎要考慮到絕大多數的 concurrency 議題。

今天我會用一個舉例來演示, 當不考慮 Race condition 就要同步非同步會產生的問題。以及猜測一個基本的非同步框架的實踐需要具有甚麼功能。

舉例

我利用 C++ 撰寫了一個 http server , 其功能單一, 僅可以提供本地靜態資源。但當我想要令其以非同步的方式運行時卻遇到了問題, 感覺上每次過來的 request 都是獨立的, 為甚麼當我利用 multi-thread 運行多個 server 時卻出現了問題呢 ?

(不需要特別閱讀原始碼, 看下面中文說明即可)

// 利用 thread 直接創建多個 http server
void startServer(int num) {
	httpServer server;
	server.start(num);
}

int main(int argc, char** argv)
{
	srand(time(NULL));
	thread threadList[THREADPOOL_SIZE];
	for (int i = 0; i < THREADPOOL_SIZE; i++)
		threadList[i] = thread(startServer, i);

	for (int i = 0; i < THREADPOOL_SIZE; i++)
		threadList[i].join();
}

報錯截圖
https://ithelp.ithome.com.tw/upload/images/20210902/20131164hEHxIBvfTg.png

原來一個port 只能 bind 一個 socket , 該 socket 可以用來監聽 http request。

至於同一個 port 不能擁有 2 個 socket 的原因, 我猜測出在 listen(listenSocket, SOMAXCONN) 這句, 因為 Win Socket 怕在監聽網際網路時, 有 2 個以上的消費者, 導致 Race condition 。 所以乾脆在創建 socket 時就禁止建立兩個, 強迫使用者用同一個介面來讀取網路資訊。

白話版說明 :

流程

  • 要求 OS 監聽網路孔
  • OS 把監聽到的資訊放在記憶體
  • 讀取記憶體獲取來自網路的資訊

其中在第二條 OS 把監聽到的資訊放在記憶體 若是有兩個以上的人同時把監聽到的資料放入記憶體, 因為他們彼此不能溝通導致監聽了相同內容, 放入了一樣的資料, 就會導致錯誤的結果(資料變2倍), 此即為 Race condition 為了避免這個狀況, win socket 禁止建立兩個監聽介面, 強迫使用者用同一個介面來讀取網路資訊。

最後解決方案就是在處理資訊與回傳時才 multi-thread , 建立單一的生產者來監聽 request 以及發放任務。概念如圖

https://ithelp.ithome.com.tw/upload/images/20210902/20131164l3XQIH4WpD.png
白話版說明:

  • 要求 OS 監聽網路孔
  • OS 把監聽到的資訊放在記憶體(只有一個人做)
  • 多個人去讀取記憶體, 獲取各自被分配到的部分資料, 彼此不會重複(衝突)

這樣就在不會導致資料錯誤的情況下, 讓多個 thread 以 非同步的方式處理 http request 了。

(原始碼可略過, 上方中文已闡述了我想表達的概念。)

class httpServer {
public:
	struct sockaddr_in localSocketSetting;
	SOCKET listenSocket;
	WSADATA windowsSocketData;
	vector<thread> threadPool;
	vector<vector<SOCKET>> tasksList;
	void start(int poolNumber) {
		printf("Start.......\n");
	rebuild:
		// 監聽網際網路, 讀取 request
		if (listen(listenSocket, SOMAXCONN) == SOCKET_ERROR)
			errorHandle("listen");
		forever
		{
			// 取得該自己處理的 request
			SOCKET messageSocket = getSocket(poolNumber);
			if (messageSocket == INVALID_SOCKET || messageSocket == FAIL_CODE)
				goto rebuild;
			// 處理 request
			request req = request(messageSocket);
			cout << endl << "thread " << poolNumber << " : " << req.filePath;
			if (req.messageLength == 0)
				continue;
			// 寄回 response
			int sentResult = responseClient(req, messageSocket);
			if (sentResult == 0)
				break;
			else if (sentResult == FAIL_CODE)
				goto rebuild;
		}
	}
	void accepter() {
		forever{
			struct sockaddr_in clientSocketSetting;
			int clientSocketSettingLength;
			clientSocketSettingLength = sizeof(clientSocketSetting);
			// 獲取所有 request
			SOCKET socket = accept(listenSocket, (struct sockaddr*)&clientSocketSetting, &clientSocketSettingLength);
			// 隨機分配 request 給各個消費者
			int poolNumber = rand() % THREADPOOL_SIZE;
			tasksList[poolNumber].push_back(socket);
		}
	}
	void go() {
		// 創建一個供給者, 利用單一介面監聽 request
		thread worker = thread(&httpServer::accepter, this);
		// 多個消費者, 不斷的處理 request , 回復 response
		for (int i = 0; i < THREADPOOL_SIZE; i++)
			threadPool.push_back(thread(&httpServer::start, this, i));
		for (auto& i : threadPool)
			i.join();
		worker.join();
	}
#pragma warning(disable: 26495)
	httpServer() {
		// 初始設定
		for (int i = 0; i < THREADPOOL_SIZE; i++) {
			vector<SOCKET> tasks;
			tasksList.push_back(tasks);
		}
		if (WSAStartup(MAKEWORD(2, 2), &windowsSocketData) == SOCKET_ERROR)
			errorHandle("WSAStartup");
		// Fill in the address structure
		localSocketSetting.sin_family = AF_INET;
		localSocketSetting.sin_addr.s_addr = INADDR_ANY;
		localSocketSetting.sin_port = htons(DEFAULT_PORT);
		listenSocket = socket(AF_INET, SOCK_STREAM, 0);
		if (listenSocket == INVALID_SOCKET)
			errorHandle("socket");
		// 創建 網路孔介面
		if (bind(listenSocket, (struct sockaddr*)&localSocketSetting, sizeof(localSocketSetting)) == SOCKET_ERROR)
			errorHandle("bind");
	}
	~httpServer() {
		WSACleanup();
	}
private:
	void errorHandle(string str) {
		cout << "error : " << str << endl;
		exit(FAIL_CODE);
	}
	int responseClient(request req, SOCKET& messageSocket) {
		// 略, 回傳
	}
	SOCKET getSocket(int poolNumber) {
		forever
		{
			if (tasksList[poolNumber].size() == 0)
				continue;
			SOCKET node = tasksList[poolNumber].back();
			tasksList[poolNumber].pop_back();
			return node;
		}
	}
};

int main(int argc, char** argv)
{
	srand(time(NULL));
	httpServer server;
	server.go();
}

觀念整理

綜合上述範例, 我們可以發現這個非同步 http server 為了完成非同步, 增加了以下幾個功能, 他們就是實踐基本的非同步框架所需的功能。

  1. 一種執行緒使用模式

    我們需要設計某種資料結構, 可以同時被多個執行緒 access 也不會發生錯誤, 並且以此做為介面連結只能同時被一條執行緒調用的資源以及可以被非同步運行的部分。

    以上方 httpServer 為例:

    "只能同時被一條執行緒調用的資源"是讀取 http request 的 socket , 可以被非同步運行的部分是"處理每一段 http request , 以及 response 給寄出者" 所以我整合了 1-to-Many modal 以及 thread pool 製作了一個資料結構, 使用上只要 socket 不斷地放入 http request , 就可以不斷的把任務分配進 thread pool 後利用非同步方法處理。

  2. 一種執行緒 schedule 方法

    當多個任務被分配到 thread pool 時, 若假設只有一個 processor , 實際上也只能一次做一件事, 所以需要有一個 scheduler 告知 thread pool 什麼時間要切換到哪一個 thread , 甚麼時間要切換到下一個 thread ,才能完成真正的非同步運行, 在該切換執行緒時切換, 使 CPU 使用率最大化。

    以上方 httpServer 為例:

    我僅是調用了 C++ 的 thread library 創建大量 thread , 卻沒有加入合理的調度機制, 使他們只是無腦的平均分配 processor 使用時間, 這樣明顯不能提高速度, 還會因為大量的 switch context 拖慢速度。若要給這支程式加上 schedule , 我應該想出機制能把閒置的 thread 踢出 thread pool , 把有任務沒做的 thread 加回 thread pool , 把...... 能做的事很多, 留到未來討論吧。

明日進度

明天開始會回過頭來跟大家聊聊一些理解底層框架需要具備的基本知識, 以此銜接更之後較為複雜的部份。

明天見 !


上一篇
[Day 1] 前言-為甚麼要探索?
下一篇
[Day 3] Atomic Operation
系列文
從C到JS的同步非同步探索30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言